Exploring Data with DataFrames and Spark SQL

In this exercise, you will explore data using the Spark DataFrames API and Spark SQL.

Load Data Using an Explicit Schema

To explore data, you must load it into a programmatic data object such as a DataFrame. If the structure of the data is known ahead of time, you can explicitly specify the schema for the DataFrame.

In this exercise, you will work with data that records details of flights.

In [1]:
from pyspark.sql.types import *

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('wasb:///data/raw-flight-data.csv', schema=flightSchema, header=True)
flights.show()
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
7application_1528789350610_0011pysparkidleLinkLink
SparkSession available as 'spark'.
+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
|        19|        5|     DL|          15016|        11433|      28|      24|
|        19|        5|     DL|          11193|        12892|      -6|     -11|
|        19|        5|     DL|          10397|        15016|      -1|     -19|
|        19|        5|     DL|          15016|        10397|       0|      -1|
|        19|        5|     DL|          10397|        14869|      15|      24|
|        19|        5|     DL|          10397|        10423|      33|      34|
|        19|        5|     DL|          11278|        10397|     323|     322|
|        19|        5|     DL|          14107|        13487|      -7|     -13|
|        19|        5|     DL|          11433|        11298|      22|      41|
|        19|        5|     DL|          11298|        11433|      40|      20|
|        19|        5|     DL|          11433|        12892|      -2|      -7|
|        19|        5|     DL|          10397|        12451|      71|      75|
|        19|        5|     DL|          12451|        10397|      75|      57|
|        19|        5|     DL|          12953|        10397|      -1|      10|
|        19|        5|     DL|          11433|        12953|      -3|     -10|
|        19|        5|     DL|          10397|        14771|      31|      38|
|        19|        5|     DL|          13204|        10397|       8|      25|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 20 rows

Infer a Data Schema

If the structure of the data source is unknown, you can have Spark auomatically infer the schema.

In this case, you will load data about airports without knowing the schema.

In [2]:
airports = spark.read.csv('wasb:///data/airports.csv', header=True, inferSchema=True)
airports.show()
+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
|     10304|      Aniak|   AK|       Aniak Airport|
|     10754|     Barrow|   AK|Wiley Post/Will R...|
|     10551|     Bethel|   AK|      Bethel Airport|
|     10926|    Cordova|   AK|Merle K Mudhole S...|
|     14709|  Deadhorse|   AK|   Deadhorse Airport|
|     11336| Dillingham|   AK|  Dillingham Airport|
|     11630|  Fairbanks|   AK|Fairbanks Interna...|
|     11997|   Gustavus|   AK|    Gustavus Airport|
|     12523|     Juneau|   AK|Juneau International|
|     12819|  Ketchikan|   AK|Ketchikan Interna...|
|     10245|King Salmon|   AK| King Salmon Airport|
|     10170|     Kodiak|   AK|      Kodiak Airport|
|     13970|   Kotzebue|   AK| Ralph Wien Memorial|
|     13873|       Nome|   AK|        Nome Airport|
|     14256| Petersburg|   AK|Petersburg James ...|
|     14828|      Sitka|   AK|Sitka Rocky Gutie...|
|     12807| St. Mary's|   AK|  St. Mary's Airport|
|     11445|   Unalaska|   AK|    Unalaska Airport|
+----------+-----------+-----+--------------------+
only showing top 20 rows

Use DataFrame Methods

Spark DataFrames provide functions that you can use to extract and manipulate data. For example, you can use the select function to return a new DataFrame containing columns selected from an existing DataFrame.

In [3]:
cities = airports.select("city", "name")
cities.show()
+-----------+--------------------+
|       city|                name|
+-----------+--------------------+
|Adak Island|                Adak|
|  Anchorage|Ted Stevens Ancho...|
|      Aniak|       Aniak Airport|
|     Barrow|Wiley Post/Will R...|
|     Bethel|      Bethel Airport|
|    Cordova|Merle K Mudhole S...|
|  Deadhorse|   Deadhorse Airport|
| Dillingham|  Dillingham Airport|
|  Fairbanks|Fairbanks Interna...|
|   Gustavus|    Gustavus Airport|
|     Juneau|Juneau International|
|  Ketchikan|Ketchikan Interna...|
|King Salmon| King Salmon Airport|
|     Kodiak|      Kodiak Airport|
|   Kotzebue| Ralph Wien Memorial|
|       Nome|        Nome Airport|
| Petersburg|Petersburg James ...|
|      Sitka|Sitka Rocky Gutie...|
| St. Mary's|  St. Mary's Airport|
|   Unalaska|    Unalaska Airport|
+-----------+--------------------+
only showing top 20 rows

Combine Operations

You can combine functions in a single statement to perform multiple operations on a DataFrame. In this case, you will use the join function to combine the flights and airports DataFrames, and then use the groupBy and count functions to return the number of flights from each airport.

In [4]:
flightsByOrigin = flights.join(airports, flights.OriginAirportID == airports.airport_id).groupBy("city").count()
flightsByOrigin.show()
+-----------------+------+
|             city| count|
+-----------------+------+
|          Phoenix| 90281|
|            Omaha| 13537|
|        Anchorage|  7777|
|   Raleigh/Durham| 28436|
|           Dallas| 19503|
|          Oakland| 25503|
|      San Antonio| 23090|
|     Philadelphia| 47659|
|       Louisville| 10953|
|      Los Angeles|118684|
|Dallas/Fort Worth|105024|
|       Sacramento| 25193|
|     Indianapolis| 18099|
|        Cleveland| 25261|
|        San Diego| 45783|
|    San Francisco| 84675|
|        Nashville| 34927|
|    Oklahoma City| 13967|
|          Detroit| 62879|
|         Portland| 30640|
+-----------------+------+
only showing top 20 rows

Count the Rows in a DataFrame

Now that you're familiar with working with DataFrames, a key task when building predictive solutions is to explore the data, determing statistics that will help you understand the data before building predictive models. For example, how many rows of flight data do you actually have?

In [5]:
flights.count()
2719418

Determine Summary Statistics

Predictive modeling is based on statistics and probability, so you will often start by looking at summary statistics. The describe function returns a DataFrame containing the count, mean, standard deviation, minimum, and maximum values for each numeric column.

In [6]:
flights.describe().show()
+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|summary|       DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|         DepDelay|         ArrDelay|
+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+
|  count|          2719418|           2719418|2719418|           2719418|           2719418|          2691974|          2690385|
|   mean|15.79747468024408|3.8983907586108497|   null| 12742.26441172339|12742.455345592329|10.53686662649788| 6.63768791455498|
| stddev|8.799860168985349|1.9859881390373335|   null|1501.9729397025758|1501.9692528927835|36.09952806643149|38.64881489390086|
|    min|                1|                 1|     9E|             10140|             10140|              -63|              -94|
|    max|               31|                 7|     YV|             15376|             15376|             1863|             1845|
+-------+-----------------+------------------+-------+------------------+------------------+-----------------+-----------------+

Determine the Presence of Duplicates

The data you have to work with won't always be perfect - often you'll want to clean the data; for example to detect and remove duplicates that might affect your model. You can use the dropDuplicates function to create a new DataFrame with the duplicates removed, enabling you to determine how many rows are duplicates of other rows.

In [7]:
flights.count() - flights.dropDuplicates().count()
22435

Identify Missing Values

As well as determing if duplicates exist in your data, you should detect missing values, and either remove rows containing missing data or replace the missing values with a suitable relacement. The dropna function creates a DataFrame with any rows containing missing data removed - you can specify a subset of columns, and whether the row should be removed in any or all values are missing. You can then use this new DataFrame to determine how many rows contain missing values.

In [8]:
flights.count() - flights.dropDuplicates().dropna(how="any", subset=["ArrDelay", "DepDelay"]).count()
46233

Clean the Data

Now that you've identified that there are duplicates and missing values, you can clean the data by removing the duplicates and replacing the missing values. The fillna function replaces missing values with a specified replacement value. In this case, you'll remove all duplicate rows and replace missing ArrDelay and DepDelay values with 0.

In [9]:
data=flights.dropDuplicates().fillna(value=0, subset=["ArrDelay", "DepDelay"])
data.count()
2696983

Check Summary Statistics

After cleaning the data, you should re-check the statistics - removing rows and changing values may affect the distribution of the data, which in turn could affect any predictive models you might create.

In [10]:
data.describe().show()
+-------+------------------+------------------+-------+------------------+------------------+------------------+------------------+
|summary|        DayofMonth|         DayOfWeek|Carrier|   OriginAirportID|     DestAirportID|          DepDelay|          ArrDelay|
+-------+------------------+------------------+-------+------------------+------------------+------------------+------------------+
|  count|           2696983|           2696983|2696983|           2696983|           2696983|           2696983|           2696983|
|   mean|15.798996508320593| 3.900369412784582|   null|12742.459424846207| 12742.85937657004|10.531134234068217|6.6679285705545785|
| stddev| 8.801267199135452|1.9864582421701986|   null|1502.0359941370616|1501.9939589817977| 36.06172819056575| 38.58386147358074|
|    min|                 1|                 1|     9E|             10140|             10140|               -63|               -94|
|    max|                31|                 7|     YV|             15376|             15376|              1863|              1845|
+-------+------------------+------------------+-------+------------------+------------------+------------------+------------------+

Explore Relationships in the Data

Predictive modeling is largely based on statistical relationships between fields in the data. To design a good model, you need to understand how the data points relate to one another and identify any apparent correlation. The corr function calculates a correlation value between -1 and 1, indicating the strength of correlation between two fields. A strong positive correlation (near 1) indicates that high values for one column are often found with high values for the other, which a string negative correlation (near -1) indicates that low values for one column are often found with high values for the other. A correlation near 0 indicates little apparent relationship between the fields.

In [11]:
data.corr("DepDelay", "ArrDelay")
0.9392630367706952

Use Spark SQL

In addition to using the DataFrame API directly to query data, you can persist DataFrames as table and use Spark SQL to query them using the SQL language. SQL is often more intuitive to use when querying tabular data structures.

In [12]:
data.createOrReplaceTempView("flightData")
spark.sql("SELECT DayOfWeek, AVG(ArrDelay) AS AvgDelay FROM flightData GROUP BY DayOfWeek ORDER BY DayOfWeek").show()
+---------+------------------+
|DayOfWeek|          AvgDelay|
+---------+------------------+
|        1| 7.077989660973244|
|        2|  4.39237404158651|
|        3| 7.234625279280266|
|        4|10.775574715480056|
|        5|  8.71110560964396|
|        6|2.1437428120738304|
|        7|  5.25403935972552|
+---------+------------------+

Use the Inline SQL Magic

Jupyter Notebooks support magics, which enable you to include inline code and functionality. For example, the %%sql magic enables you to write SQL queries and visualize the results directly in the notebook.

Run the following query, and view the table of results that is returned.

In [13]:
%%sql
SELECT DepDelay, ArrDelay FROM flightData
DepDelay ArrDelay
0 -3 3
1 3 21
2 -4 -5
3 1 7
4 3 -4
5 -2 13
6 -7 14
7 -1 -1
8 -1 -12
9 -6 -1
10 -7 -3
11 18 31
12 -2 1
13 -7 -19
14 -5 -10
15 -3 -43
16 37 36
17 1 -2
18 -2 -8
19 -1 -6
20 -4 -22
21 15 26
22 -1 -6
23 -4 -10
24 0 -13
25 105 126
26 -8 -9
27 -3 -29
28 -6 -14
29 -1 -7
... ... ...
2470 -1 22
2471 17 18
2472 4 -6
2473 -2 -5
2474 -1 -2
2475 -6 -11
2476 -7 -13
2477 -2 0
2478 1 1
2479 2 0
2480 -1 3
2481 -2 -3
2482 -4 -10
2483 -10 -8
2484 18 4
2485 -6 -16
2486 20 7
2487 3 1
2488 33 13
2489 -3 -7
2490 7 11
2491 4 0
2492 16 -3
2493 -6 -13
2494 5 -3
2495 -18 -22
2496 4 -11
2497 -5 -9
2498 -3 15
2499 -10 -8

2500 rows × 2 columns

Change the Table visualization of results above to a Scatter visualization to see the relationship between the DepDelay and ArrDelay values more clearly (use the - function to plot the actual values) - visualizations like this make it easier to show relationships as apparent structure in the data. For example, the positive correlation between DepDelay and ArrDelay seems to be a linear relationsip, creaing a diagonal line of plotted points.

Query Multiple Tables

You can create and query multiple temporary tables. Run the cells below to create a temporary table from the airports DataFrame, and then use an inline query to query it together with the flights data.

In [14]:
airports.createOrReplaceTempView("airportData")
In [15]:
%%sql
SELECT a.name, AVG(f.ArrDelay) AS avgdelay
FROM flightData AS f JOIN airportData AS a
ON f.DestAirportID = a.airport_id
GROUP BY a.name
name avgdelay
0 Eppley Airfield 9.023494
1 Kahului Airport 4.454699
2 San Diego International 5.773734
3 Bob Hope 4.499965
4 Hartsfield-Jackson Atlanta International 7.278793
5 Sacramento International 5.809755
6 Chicago O'Hare International 9.796628
7 Will Rogers World 10.492794
8 Ted Stevens Anchorage International -1.384565
9 Raleigh-Durham International 6.549667
10 Minneapolis-St Paul International 2.910012
11 Metropolitan Oakland International 5.143380
12 Norman Y. Mineta San Jose International 5.055080
13 Southwest Florida International 4.072908
14 San Antonio International 8.470014
15 Cincinnati/Northern Kentucky International 4.714567
16 LaGuardia 11.209839
17 William P Hobby 7.540951
18 Philadelphia International 8.517859
19 Miami International 3.578246
20 John F. Kennedy International 10.491479
21 Theodore Francis Green State 8.907451
22 Long Beach Airport 1.392459
23 Honolulu International 5.566452
24 Port Columbus International 9.059603
25 Los Angeles International 5.490520
26 Austin - Bergstrom International 8.741181
27 Newark Liberty International 12.181045
28 Tucson International 7.247501
29 John Wayne Airport-Orange County 1.800238
... ... ...
40 Louis Armstrong New Orleans International 5.660180
41 Ontario International 6.659643
42 Detroit Metro Wayne County 3.907228
43 Salt Lake City International 2.507322
44 Louisville International-Standiford Field 8.779322
45 Ronald Reagan Washington National 6.247987
46 Fort Lauderdale-Hollywood International 6.744434
47 Palm Beach International 7.019600
48 Pittsburgh International 6.832537
49 Memphis International 4.346110
50 Phoenix Sky Harbor International 3.239210
51 Tampa International 5.734449
52 Richmond International 9.473262
53 Dallas/Fort Worth International 5.654730
54 Washington Dulles International 6.967958
55 Nashville International 7.656307
56 Chicago Midway International 7.637624
57 Charlotte Douglas International 5.186660
58 Bradley International 8.519865
59 McCarran International 5.144977
60 Indianapolis International 6.877842
61 Seattle/Tacoma International 2.186325
62 Dallas Love Field 9.379630
63 San Francisco International 13.355709
64 Norfolk International 9.420622
65 Cleveland-Hopkins International 7.592609
66 Luis Munoz Marin International 7.123676
67 Jacksonville International 7.450801
68 Reno/Tahoe International 6.610800
69 Buffalo Niagara International 8.079164

70 rows × 2 columns

As you saw previously, it can sometimes be useful to visualize the results of a query. Change the visualization above to a Bar chart, using the - function, to see the everage lateness (or earliness) of flights at all destinations.